package net.maergaiyun.func;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import net.maergaiyun.model.ShortLinkWideDO;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;


@Slf4j
public class AsyncLocationRequestFunction extends RichAsyncFunction<ShortLinkWideDO, String> {
    private static final String IP_PARSE_URL = "https://restapi.amap.com/v3/ip?ip=%s&output=json&key=995b62fbaef37f1eee3ad94f54a36f8d";

    private CloseableHttpAsyncClient httpAsyncClient;

    @Override
    public void timeout(ShortLinkWideDO shortLinkWideDO, ResultFuture<String> resultFuture) throws Exception {
        shortLinkWideDO.setProvince("-");
        shortLinkWideDO.setCity("-");
        resultFuture.complete(Collections.singleton(JSON.toJSONString(shortLinkWideDO)));
//        resultFuture.complete(Collections.singleton(null));
    }

    @Override
    public void close() throws Exception {
        if (this.httpAsyncClient != null) {
            this.httpAsyncClient.close();
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        this.httpAsyncClient = createAsyncHttpClient();
    }

    @Override
    public void asyncInvoke(ShortLinkWideDO shortLinkWideDO, ResultFuture<String> resultFuture) {
        String ip = shortLinkWideDO.getIp();
        String url = String.format(IP_PARSE_URL, ip);
        HttpGet httpGet = new HttpGet(url);

        Future<HttpResponse> future = httpAsyncClient.execute(httpGet, null);

        CompletableFuture<ShortLinkWideDO> completableFuture = CompletableFuture.supplyAsync(new Supplier<ShortLinkWideDO>() {
            @Override
            public ShortLinkWideDO get() {
                try {
                    HttpResponse response = future.get();

                    int statusCode = response.getStatusLine().getStatusCode();
                    if (statusCode == HttpStatus.SC_OK) {
                        HttpEntity entity = response.getEntity();

                        String result = EntityUtils.toString(entity, "UTF-8");
                        JSONObject locationObj = JSON.parseObject(result);

                        String province = locationObj.getString("province");
                        String city = locationObj.getString("city");
                        shortLinkWideDO.setProvince(province);
                        shortLinkWideDO.setCity(city);
                        return shortLinkWideDO;
                    }
                } catch (InterruptedException | ExecutionException | IOException e) {
                    log.error("ip解析错误, shortLinkWideDO={}, e={}", shortLinkWideDO, e.getMessage());
                }

                shortLinkWideDO.setProvince("-");
                shortLinkWideDO.setCity("-");
                return shortLinkWideDO;
            }
        });

        completableFuture.thenAccept(new Consumer<ShortLinkWideDO>() {
            @Override
            public void accept(ShortLinkWideDO shortLinkWideDO) {
                resultFuture.complete(Collections.singleton(JSON.toJSONString(shortLinkWideDO)));
            }
        });

//        completableFuture.thenAccept((result) -> {
//            resultFuture.complete(Collections.singleton(JSON.toJSONString(shortLinkWideDO)));
//        });
    }

    private CloseableHttpAsyncClient createAsyncHttpClient() throws Exception {
        try {
            RequestConfig requestConfig = RequestConfig.custom()
                    // 返回数据的超时时间
                    .setSocketTimeout(20000)
                    // 连接上服务器的超时时间
                    .setConnectTimeout(10000)
                    // 从连接池中获取连接的超时时间
                    .setConnectionRequestTimeout(1000).build();

            ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();

            PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
            // 设置连接池最大是500个连接
            connManager.setMaxTotal(500);
            // MaxPerRoute是对maxTotal的细分, 每个主机最大并发是300, route是指域名
            connManager.setDefaultMaxPerRoute(300);

            CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom().setConnectionManager(connManager).setDefaultRequestConfig(requestConfig).build();
            httpClient.start();
            return httpClient;

        } catch (IOReactorException e) {
            log.error("初始化CloseableHttpAsyncClient异常: {}", e.getMessage());
            return null;
        }
    }
}
